ローカルファイルに対してカラム操作とファイル出力をしてみた | Luigi Advent Calendar 2016 #06
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』6日目の内容となります。
今回はローカルファイルを参照していきたいと思います。
先日5日目は複数のローカルファイルを参照してみるでした。
前回・前々回とファイルの参照を中心にやってきましたが、今回は参照したファイルの加工と加工した結果の出力を行いたいと思います。
下準備
前回と同様のTSVファイルを用います。
1 Customer#000000001 IVhzIApeRb MOROCCO 0 MOROCCO AFRICA 25-989-741-2988 BUILDING 2 Customer#000000002 XSTf4,NCwDVaWNe6tE JORDAN 1 JORDAN MIDDLE EAST 23-768-687-3665 AUTOMOBILE 3 Customer#000000003 MG9kdTD ARGENTINA7 ARGENTINA AMERICA 11-719-748-3364 AUTOMOBILE 4 Customer#000000004 XxVSJsL EGYPT 4 EGYPT MIDDLE EAST 14-128-190-5944 MACHINERY 5 Customer#000000005 KvpyuHCplrB84WgAi CANADA 5 CANADA AMERICA 13-750-942-6364 HOUSEHOLD 6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx SAUDI ARA2 SAUDI ARABIA MIDDLE EAST 30-114-968-4951 AUTOMOBILE 7 Customer#000000007 TcGe5gaZNgVePxU5kR CHINA 0 CHINA ASIA 28-190-982-9759 AUTOMOBILE 8 Customer#000000008 I0B10bB0AymmC, 0PrRYBC PERU 6 PERU AMERICA 27-147-574-9335 BUILDING 9 Customer#000000009 xKiAFTjUsCuxfele INDIA 6 INDIA ASIA 18-338-906-3675 FURNITURE 10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL ETHIOPIA 9 ETHIOPIA AFRICA 15-741-346-9870 HOUSEHOL
このTSVファイルは以下のような構成になっています。
c_custkey | c_name | c_address | c_city | c_nation | c_region | c_phone | c_mktsegment |
---|---|---|---|---|---|---|---|
1 | Customer#000000001 | IVhzIApeRb | MOROCCO 0 | MOROCCO | AFRICA | 25-989-741-2988 | BUILDING |
2 | Customer#000000002 | XSTf4,NCwDVaWNe6tE | JORDAN 1 | JORDAN | MIDDLE EAST | 23-768-687-3665 | AUTOMOBILE |
3 | Customer#000000003 | MG9kdTD | ARGENTINA7 | ARGENTINA | AMERICA | 11-719-748-3364 | AUTOMOBILE |
4 | Customer#000000004 | XxVSJsL | EGYPT 4 | EGYPT | MIDDLE EAST | 14-128-190-5944 | MACHINERY |
5 | Customer#000000005 | KvpyuHCplrB84WgAi | CANADA 5 | CANADA | AMERICA | 13-750-942-6364 | HOUSEHOLD |
6 | Customer#000000006 | sKZz0CsnMD7mp4Xd0YrBvx | SAUDI ARA2 | SAUDI ARABIA | MIDDLE EAST | 30-114-968-4951 | AUTOMOBILE |
7 | Customer#000000007 | TcGe5gaZNgVePxU5kR | CHINA 0 | CHINA | ASIA | 28-190-982-9759 | AUTOMOBILE |
8 | Customer#000000008 | I0B10bB0AymmC, 0PrRYBC | PERU 6 | PERU | AMERICA | 27-147-574-9335 | BUILDING |
9 | Customer#000000009 | xKiAFTjUsCuxfele | INDIA 6 | INDIA | ASIA | 18-338-906-3675 | FURNITURE |
10 | Customer#000000010 | 6LrEaV6KR6PLVcgl2ArL | ETHIOPIA 9 | ETHIOPIA | AFRICA | 15-741-346-9870 | HOUSEHOLD |
今回の実装では、3番目のカラムであるc_addressを削除し、7番目のカラムのc_phoneをマスキングしたTSVファイルを出力するタスクを実装していきます。
ファイルの加工と出力
ファイルの参照を行うタスクの実装は前回と変更ありません。
パラメータでファイル名を受け取れるようにしておき、
outputメソッドでパラメータで受け取ったファイルを返すといった実装になります。
import luigi class TsvInput(luigi.Task): filename = luigi.Parameter() def output(self): return luigi.LocalTarget(self.filename)
次にファイルの加工についてですが、Luigiの話ではなくPythonではどうするかといった話になります。
実装内容としては下記のようになります。
class TsvColumnEdit(luigi.Task): fileName = luigi.Parameter(default="customer0.tsv") def requires(self): return TsvInput(self.fileName) def output(self): return luigi.LocalTarget("translate_" + self.fileName) def run(self): ## あとで説明するためここでは省略
filenameをluigi.Parameterとすることで、実行時に処理を行うファイル名を指定できるようにしています。
また、今回の処理対象ファイルであるcustomer0.tsvをデフォルトパラメータとして指定しています。
次にrequiresメソッドでファイルを参照しているタスクを依存関係として指定しています。
今回はファイルを出力するタスクのため、outputメソッドを実装しています。
こちらで出力するファイルを指定しています。
カラム削除
実際のタスクの挙動についてはrunメソッドに記載していきます。
そのため、今回のカラム削除や値のマスキングといった処理はすべてrunメソッド内に記載することになります。
def run(self): with self.input().open('r') as input, self.output().open('w') as out_file: for line in input: c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment = line.split('\t') maskedPhone = [] tmpPhoneArray = c_phone.split('-') for i, v in enumerate(tmpPhoneArray): if i == len(tmpPhoneArray) - 1: maskedPhone.append(v) else: maskedPhone.append(re.sub('\d', '*', v)) out_file.write('{}\t{}\t{}\t{}\t{}\t{}\t{}\n'.format(c_custkey, c_name, c_city, c_nation, c_region, '-'.join(maskedPhone), c_mktsegment))
まず、requiresメソッドで参照したファイルとoutputメソッドで出力先として指定したファイルをwithステートメントを用いてオブジェクトを取得しています。
splitメソッドに引数にtab(\t)を指定することで、タブを区切り文字とした配列を取得することができます。
その際にその配列の要素数分変数を用意しておくことで、用意した変数に値が格納されることになります。(4行目)
その後、ファイルに書き出す際に、書き出さない変数を含まずに書き出しています。(13行目)
マスキング
先ほど書いたrunメソッドからマスキングに関する個所のみ抜き出します。
maskedPhone = [] tmpPhoneArray = c_phone.split('-') for i, v in enumerate(tmpPhoneArray): if i == len(tmpPhoneArray) - 1: maskedPhone.append(v) else: maskedPhone.append(re.sub('\d', '*', v))
カラム削除のところで変数に格納した値(c_phone)のマスキングを行います。
マスキング処理をする値はで区切り文字付(-)の電話番号のため、まずは区切り文字毎に分割をします。
これはカラム削除の時に用いたsplitメソッドに引数に区切り文字である"-"を指定することで実現できます。
次に、分割した配列をfor文で処理していきます。
今回は下4桁以外をマスキングするといった処理を実施しました。
for文で処理する際にindexも含めて取得しておき、配列の一番最後の要素以外のときは
正規表現の置き換えで数字をすべて * に置き換えた文字列を配列に追加していきます。
一番最後の要素の際は、そのまま配列に追加しています。
最後に配列を文字列として書き出す際に、joinメソッドを用いることで区切り文字を指定して書き出すことができるので、
-を区切り文字として、マスキングした値を含む配列の中身を出力しています。
最後に実装したタスク全体と、実行結果を記載しておきます。
import luigi import re class TsvInput(luigi.Task): filename = luigi.Parameter() def output(self): return luigi.LocalTarget(self.filename) class TsvColumnEdit(luigi.Task): fileName = luigi.Parameter(default="customer0.tsv") def requires(self): return TsvInput(self.fileName) def output(self): return luigi.LocalTarget("translate_" + self.fileName) def run(self): with self.input().open('r') as input, self.output().open('w') as out_file: for line in input: c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment = line.split('\t') maskedPhone = [] tmpPhoneArray = c_phone.split('-') for i, v in enumerate(tmpPhoneArray): if i == len(tmpPhoneArray) - 1: maskedPhone.append(v) else: maskedPhone.append(re.sub('\d', '*', v)) out_file.write('{}\t{}\t{}\t{}\t{}\t{}\t{}\n'.format(c_custkey, c_name, c_city, c_nation, c_region, '-'.join(maskedPhone), c_mktsegment)) if __name__ == '__main__': luigi.run()
実行結果
$ python ./UseTsv.py --local-scheduler TsvColumnEdit > result.txt DEBUG: Checking if TsvColumnEdit(fileName=customer0.tsv) is complete DEBUG: Checking if TsvInput(filename=customer0.tsv) is complete INFO: Informed scheduler that task TsvColumnEdit_customer0_tsv_41b288805d has status PENDING INFO: Informed scheduler that task TsvInput_customer0_tsv_3630d0d21c has status DONE INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 35607] Worker Worker(salt=365300218, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=35607) running TsvColumnEdit(fileName=customer0.tsv) INFO: [pid 35607] Worker Worker(salt=365300218, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=35607) done TsvColumnEdit(fileName=customer0.tsv) DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task TsvColumnEdit_customer0_tsv_41b288805d has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=365300218, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=35607) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 1 present dependencies were encountered: - 1 TsvInput(filename=customer0.tsv) * 1 ran successfully: - 1 TsvColumnEdit(fileName=customer0.tsv) This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
1 Customer#000000001 MOROCCO 0 MOROCCO AFRICA **-***-***-2988 BUILDING 2 Customer#000000002 JORDAN 1 JORDAN MIDDLE EAST **-***-***-3665 AUTOMOBILE 3 Customer#000000003 ARGENTINA7 ARGENTINA AMERICA **-***-***-3364 AUTOMOBILE 4 Customer#000000004 EGYPT 4 EGYPT MIDDLE EAST **-***-***-5944 MACHINERY 5 Customer#000000005 CANADA 5 CANADA AMERICA **-***-***-6364 HOUSEHOLD 6 Customer#000000006 SAUDI ARA2 SAUDI ARABIA MIDDLE EAST **-***-***-4951 AUTOMOBILE 7 Customer#000000007 CHINA 0 CHINA ASIA **-***-***-9759 AUTOMOBILE 8 Customer#000000008 PERU 6 PERU AMERICA **-***-***-9335 BUILDING 9 Customer#000000009 INDIA 6 INDIA ASIA **-***-***-3675 FURNITURE 10 Customer#000000010 ETHIOPIA 9 ETHIOPIA AFRICA **-***-***-9870 HOUSEHOLD
まとめ
Luigiの実装というよりかはPythonの実装の話になってはいますが、連携したデータに対してマスキング処理をかけることができました。 明日は設定ファイルについて調べてみたいと思います。